/*

Copyright (C) SYSTAP, LLC DBA Blazegraph 2006-2016.  All rights reserved.

Contact:
     SYSTAP, LLC DBA Blazegraph
     2501 Calvert ST NW #106
     Washington, DC 20008
     licenses@blazegraph.com

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

*/
package com.bigdata.sparse;

import java.text.RuleBasedCollator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.log4j.Logger;

import com.bigdata.bfs.BigdataFileSystem;
import com.bigdata.btree.IIndex;
import com.bigdata.btree.IRangeQuery;
import com.bigdata.btree.ITuple;
import com.bigdata.btree.IndexMetadata;
import com.bigdata.btree.keys.CollatorEnum;
import com.bigdata.btree.keys.IKeyBuilder;
import com.bigdata.journal.IIndexManager;
import com.bigdata.journal.ITimestampService;
import com.bigdata.journal.Journal;
import com.bigdata.btree.AbstractBTree;
import com.bigdata.journal.TimestampUtility;
import com.bigdata.rdf.store.AbstractTripleStore;
import com.bigdata.relation.RelationSchema;
import com.bigdata.service.ndx.IClientIndex;

import cutthecrap.utils.striterators.Resolver;
import cutthecrap.utils.striterators.Striterator;

/**
 * A client-side class that knows how to use an {@link IIndex} to provide an
 * efficient data model in which a logical row is stored as one or more entries
 * in the {@link IIndex}. Operations are provided for atomic read and write of
 * logical row. While the scan operations are always consistent (they will never
 * reveal data from a row that undergoing concurrent modification), they do NOT
 * cause concurrent atomic row writes to block. This means that rows that would
 * be visited by a scan MAY be modified before the scan reaches those rows and
 * the client will see the updates.
 * <p>
 * The {@link SparseRowStore} requires that you declare the {@link KeyType} for
 * primary key so that it may impose a consistent total ordering over the
 * generated keys in the index.
 * <p>
 * There is no intrinsic reason why column values must be strongly typed.
 * Therefore, by default column values are loosely typed. However, column values
 * MAY be constrained by a {@link Schema}.
 * <p>
 * This class builds keys using the sparse row store design pattern. Each
 * logical row is modeled as an ordered set of index entries whose keys are
 * formed as:
 * </p>
 * 
 * <pre>
 *                                             
 * [schemaName][primaryKey][columnName][timestamp]
 *                                             
 * </pre>
 * 
 * <p>
 * 
 * and the values are the value for a given column for that primary key.
 * 
 * </p>
 * 
 * <p>
 * 
 * Timestamps are either generated by the application, in which case they define
 * the semantics of a write-write conflict, or on write by the index. In the
 * latter case, write-write conflicts never arise. Regardless of how timestamps
 * are generated, the use of the timestamp in the <em>key</em> requires that
 * applications specify filters that are applied during row scans to limit the
 * data points actually returned as part of the row. For example, only returning
 * the most recent column values no later than a given timestamp for all columns
 * for some primary key.
 * 
 * </p>
 * 
 * <p>
 * 
 * For example, assuming records with the following columns
 * 
 * <ul>
 * <li>Id</li>
 * <li>Name</li>
 * <li>Employer</li>
 * <li>DateOfHire</li>
 * </ul>
 * 
 * would be represented as a series of index entries as follows:
 * 
 * </p>
 * 
 * <pre>
 *                                             
 * [employee][12][DateOfHire][t0] : [4/30/02]
 * [employee][12][DateOfHire][t1] : [4/30/05]
 * [employee][12][Employer][t0]   : [SAIC]
 * [employee][12][Employer][t1]   : [SYSTAP]
 * [employee][12][Id][t0]         : [12]
 * [employee][12][Name][t0]       : [Bryan Thompson]
 *                                             
 * </pre>
 * 
 * <p>
 * 
 * In order to read the logical row whose last update was <code>t0</code>,
 * the caller would specify <code>t0</code> as the <i>toTime</i> of interest.
 * The values read in this example would be {&lt;DateOfHire, t0, 4/30/02&gt;,
 * &lt;Employer, t0, SAIC&gt;, &lt;Id, t0, 12&gt;, &lt;Name, t0, Bryan
 * Thompson&gt;}.
 * </p>
 * <p>
 * Likewise, in order to read the logical row whose last update was
 * &lt;code&gt;t1&lt;/code&gt; the caller would specify
 * &lt;code&gt;t1&lt;/code&gt; as the <i>toTime</i> of interest. The values
 * read in this example would be {&lt;DateOfHire, t1, 4/30/05&gt;, &lt;Employer,
 * t0, SYSTAP&gt;, &lt;Id, t0, 12&gt;, &lt;Name, t0, Bryan Thompson&gt;}. Notice
 * that values written at &lt;code&gt;t0&lt;/code&gt; and not overwritten or
 * deleted by &lt;code&gt;t1&lt;/code&gt; are present in the resulting logical
 * row.
 * </p>
 * <p>
 * Note: Very large objects should be stored in the {@link BigdataFileSystem}
 * (distributed, atomic, versioned, chunked file system) and the identifier for
 * that object can then be stored in the row store.
 * </p>
 * 
 * @author <a href="mailto:thompsonbry@users.sourceforge.net">Bryan Thompson</a>
 * @version $Id$
 * 
 * FIXME write a REST service using Json to interchange data with the
 * {@link SparseRowStore}. A caching layer in the web app could be used to
 * reduce any hotspots.
 * 
 * @author <a href="mailto:thompsonbry@users.sourceforge.net">Bryan Thompson</a>
 * @version $Id$
 */
public class SparseRowStore implements IRowStoreConstants {

    protected static final Logger log = Logger.getLogger(SparseRowStore.class);

//    /**
//     * True iff the {@link #log} level is INFO or less.
//     */
//    final protected boolean INFO = log.isInfoEnabled();
//
//    /**
//     * True iff the {@link #log} level is DEBUG or less.
//     */
//    final protected boolean DEBUG = log.isDebugEnabled();

    static final String UTF8 = "UTF-8";
    
    private final IIndex ndx;

    /**
     * The backing index.
     */
    public IIndex getIndex() {
        
        return ndx;
        
    }

    /**
     * Create a client-side abstraction that treats an {@link IIndex} as a
     * {@link SparseRowStore}.
     * <p>
     * Note: When creating the backing index you MUST specify the split handler
     * to ensure that dynamic sharding does not break logical rows, e.g.:
     * 
     * <pre>
     * md.setSplitHandler(LogicalRowSplitHandler.INSTANCE);
     * </pre>
     * 
     * Note: The JDK {@link RuleBasedCollator} embeds <code>nul</code> bytes in
     * the Unicode sort keys. This makes them unsuitable for the row store which
     * can not locate the start of the column name if there are embedded
     * <code>nul</code>s in the primaryKey. Therefore, if you are using the
     * {@link CollatorEnum#JDK} as your default collator, then you MUST override
     * the {@link IndexMetadata} for the row store to use either an ASCII
     * collator or the ICU collator. In general, the ICU collator is superior to
     * the JDK collator and will be used by default. The ASCII collator is not
     * ideal since non-ascii distinctions will be lost, but it is better than
     * being unable to decode the data in the row store.
     * 
     * @param ndx
     *            The index.
     */
    public SparseRowStore(final IIndex ndx) {

        if (ndx == null)
            throw new IllegalArgumentException();

        this.ndx = ndx;
        
    }

//    /**
//     * Used to encode and decode tuples for the {@link SparseRowStore} index.
//     * Each tuple corresponds to a {@link ITPV timestamped property value} for
//     * some {@link Schema}.
//     * 
//     * @todo there needs to be some way to lookup the {@link Schema} from the
//     *       schema name as encoded in the key. One possibility is to register
//     *       the known {@link Schema} against a static factory. Another is to
//     *       have the known {@link Schema} registered in the
//     *       {@link IndexMetadata} for the index backing the
//     *       {@link SparseRowStore} (much like an extSer integration). The
//     *       schema can be resolved using its encoded bytes as the key and the
//     *       Unicode text of the schema name can be persisted in the
//     *       {@link Schema}'s data.  See the TPSTupleSerializer also.
//     * 
//     * @author <a href="mailto:thompsonbry@users.sourceforge.net">Bryan Thompson</a>
//     * @version $Id$
//     */
//    public class TPVTupleSerializer implements ITupleSerializer {
//
//        /**
//         * De-serialization ctor.
//         */
//        public TPVTupleSerializer() {
//            
//        }
//        
//        public byte[] serializeKey(TPV t) {
//            
//            if(t == null) throw new IllegalArgumentException();
//            
//            IKeyBuilder keyBuilder = getKeyBuilderFactory().getKeyBuilder();
//            
//            final byte[] key = t.getSchema().getKey(keyBuilder, t.primaryKey, t.getName(), t.getTimestamp());
//            
//            return key;
//        }
//
//        /**
//         * De-serializes as much of the key as possible.
//         * 
//         * @see KeyDecoder
//         */
//        public KeyDecoder deserializeKey(ITuple tuple) {
//
//            return new KeyDecoder(tuple.getKey());
//            
//        }
//
//        public byte[] serializeVal(TPV t) {
//
//            return ValueType.encode(t.getValue());
//            
//        }
//        
//        public ITPV deserialize(ITuple tuple) {
//            
//            final KeyDecoder keyDecoder = new KeyDecoder(tuple.getKey());
//
//            final Schema schema = resolveSchema(keyDecoder.getSchemaBytes());
//
//            final Object value = ValueType.decode(tuple.getValue());
//
//            final TPV t = new TPV(schema, keyDecoder.getColumnName(), keyDecoder
//                    .getTimestamp(), value);
//            
//            return t;
//            
//        }
//
//    }
    
    /**
     * Verifies the given arguments.
     */
    final static void assertArgs(final Schema schema, final Object primaryKey,
            final long fromTime, final long toTime) {
        
        if (schema == null)
            throw new IllegalArgumentException("schema");
        
        if (primaryKey == null)
            throw new IllegalArgumentException("primaryKey");

        if (fromTime == CURRENT_ROW) {
            
            throw new IllegalArgumentException(
                    "fromTime MAY NOT be 'CURRENT_ROW'");
            
        }
        
        if (fromTime < MIN_TIMESTAMP) {

            throw new IllegalArgumentException("fromTime less than MIN_TIMESTAMP");
            
        }
        
        if (toTime != CURRENT_ROW) {

            if (fromTime >= toTime) {

                throw new IllegalArgumentException("from/to time out of order");
                
            }
            
        }
        
    }
    
    /**
     * Verifies the writeTime.
     * 
     * @param writeTime
     */
    final static void assertWriteTime(long writeTime) {
        
        if (writeTime == AUTO_TIMESTAMP)
            return;

        if (writeTime == AUTO_TIMESTAMP_UNIQUE)
            return;
        
        if (writeTime < MIN_TIMESTAMP)
            throw new IllegalArgumentException();
        
    }
    
    /**
     * Validates the column name productions
     */
    final static void assertPropertyNames(final Map<String, Object> propertySet) {

        if (propertySet == null)
            throw new IllegalArgumentException();
        
        final Iterator<String> itr = propertySet.keySet().iterator();

        while (itr.hasNext()) {

            final String col = itr.next();

            // validate the column name production.
            NameChecker.assertColumnName(col);

        }

    }
    
    /**
     * Return the current binding for the named property.
     * 
     * @param schema
     *            The {@link Schema} governing the logical row.
     * @param primaryKey
     *            The primary key that identifies the logical row.
     * @param name
     *            The property name.
     * @return The current binding -or- <code>null</code> iff the property is
     *         not bound.
     * 
     * @todo this can be optimized and should use its own stored procedure. See
     *       {@link AbstractAtomicRowReadOrWrite#getCurrentValue(IIndex, Schema, Object, String)}
     */
    public Object get(final Schema schema, final Object primaryKey, final String name) {

        final TPS tps = (TPS) read(schema, primaryKey, MIN_TIMESTAMP,
                CURRENT_ROW, new SingleColumnFilter(name));

        if (tps == null) {

            return null;
            
        }
        
        return tps.get(name).getValue();
        
    }
    
    /**
     * Read the most recent logical row from the index.
     * 
     * @param schema
     *            The {@link Schema} governing the logical row.
     * @param primaryKey
     *            The primary key that identifies the logical row.
     * 
     * @return The data for the current state of that logical row -or-
     *         <code>null</code> IFF there are no property values for that
     *         logical row (including no deleted property values, no property
     *         values that are excluded due to their timestamps, and no property
     *         values that are excluded due to a property name filter). A
     *         <code>null</code> return is a strong guarantee that NO data
     *         existed in the row store and that time of the read for the given
     *         <i>schema</i> and <i>primaryKey</i>.
     */
    public Map<String, Object> read(final Schema schema, final Object primaryKey) {

        final TPS tps = (TPS) read(schema, primaryKey, MIN_TIMESTAMP,
                CURRENT_ROW, null/* filter */);

        if (tps == null) {

            return null;

        }

        return tps.asMap();

    }

    /**
     * Read the most recent logical row from the index.
     * 
     * @param schema
     *            The {@link Schema} governing the logical row.
     * @param primaryKey
     *            The primary key that identifies the logical row.
     * @param filter
     *            An optional filter.
     * 
     * @return The data for the current state of that logical row -or-
     *         <code>null</code> IFF there are no property values for that
     *         logical row (including no deleted property values, no property
     *         values that are excluded due to their timestamps, and no property
     *         values that are excluded due to a property name filter). A
     *         <code>null</code> return is a strong guarantee that NO data
     *         existed in the row store and that time of the read for the given
     *         <i>schema</i> and <i>primaryKey</i>.
     */
    public Map<String, Object> read(final Schema schema,
            final Object primaryKey, final INameFilter filter) {

        final TPS tps = (TPS) read(schema, primaryKey, MIN_TIMESTAMP,
                CURRENT_ROW, filter);

        if (tps == null) {

            return null;

        }

        return tps.asMap();

    }

    /**
     * Read a logical row from the index.
     * 
     * @param schema
     *            The {@link Schema} governing the logical row.
     * @param primaryKey
     *            The primary key that identifies the logical row.
     * @param fromTime
     *            The first timestamp for which timestamped property values will
     *            be accepted.
     * @param toTime
     *            The first timestamp for which timestamped property values will
     *            NOT be accepted -or- {@link IRowStoreConstants#CURRENT_ROW} to
     *            accept only the most current binding whose timestamp is GTE
     *            <i>fromTime</i>.
     * @param filter
     *            An optional filter that may be used to select values for
     *            property names accepted by the filter.
     * 
     * @return The data in that row -or- <code>null</code> IFF there are no
     *         property values for that logical row (including no deleted
     *         property values, no property values that are excluded due to
     *         their timestamps, and no property values that are excluded due to
     *         a property name filter). A <code>null</code> return is a strong
     *         guarantee that NO data existed in the row store and that time of
     *         the read for the given <i>schema</i> and <i>primaryKey</i>.
     * 
     * @throws IllegalArgumentException
     *             if the <i>schema</i> is <code>null</code>.
     * @throws IllegalArgumentException
     *             if the <i>primaryKey</i> is <code>null</code>.
     * @throws IllegalArgumentException
     *             if the <i>fromFrom</i> and or <i>toTime</i> are invalid.
     * 
     * @see ITimestampPropertySet#asMap(), return the most current bindings.
     * @see ITimestampPropertySet#asMap(long)), return the most current bindings
     *      as of the specified timestamp.
     * 
     * @see IRowStoreConstants#CURRENT_ROW
     * @see IRowStoreConstants#MIN_TIMESTAMP
     * @see IRowStoreConstants#MAX_TIMESTAMP
     */
    public ITPS read(final Schema schema, final Object primaryKey,
            final long fromTime, final long toTime, final INameFilter filter) {

        assertArgs(schema, primaryKey, fromTime, toTime);

        if (log.isInfoEnabled()) {
            String ts = "N/A";
            if (getIndex() instanceof IClientIndex) {
                ts = TimestampUtility.toString(((IClientIndex) getIndex())
                        .getTimestamp());
            } else if (getIndex() instanceof AbstractBTree) {
                ts = TimestampUtility.toString(((AbstractBTree) getIndex())
                        .getLastCommitTime());
            }
            log.info("ts=" + ts + ", schema=" + schema.getName()
                    + ", primaryKey=" + primaryKey + ", fromTime=" + fromTime
                    + ", toTime=" + toTime + ", filter="
                    + (filter == null ? "N/A" : filter.getClass().getName()));
        }
        
        final AtomicRowRead proc = new AtomicRowRead(schema, primaryKey,
                fromTime, toTime, filter);
        
        final byte[] key = schema.fromKey(
                ndx.getIndexMetadata().getKeyBuilder(), primaryKey).getKey();

        // Submit the atomic read operation.
        return (TPS) ndx.submit(key, proc);

    }
    
    /**
     * Atomic write with atomic read-back of the post-update state of the
     * logical row.
     * <p>
     * Note: In order to cause a column value for row to be deleted you MUST
     * specify a <code>null</code> column value for that column.
     * <p>
     * Note: the value of the <i>primaryKey</i> is written each time the
     * logical row is updated and timestamp associate with the value for the
     * <i>primaryKey</i> property tells you the timestamp of each row revision.
     * 
     * @param schema
     *            The {@link Schema} governing the logical row.
     * 
     * @param propertySet
     *            The column names and values for that row.
     * 
     * @return The result of an atomic read on the post-update state of the
     *         logical row. Only the most current bindings will be present for
     *         each property.
     */
    public Map<String, Object> write(final Schema schema,
            final Map<String, Object> propertySet) {

        return write(schema, propertySet, AUTO_TIMESTAMP_UNIQUE, null/* filter */,
                null/* precondition */).asMap();
        
    }

    /**
     * Atomic write with atomic read-back of the post-update state of the
     * logical row.
     * 
     * @param schema
     *            The {@link Schema} governing the logical row.
     * @param propertySet
     *            The column names and values for that row.
     * @param writeTime
     *            The timestamp to use for the row -or-
     *            {@link IRowStoreConstants#AUTO_TIMESTAMP} if the timestamp
     *            will be generated by the server -or-
     *            {@link IRowStoreConstants#AUTO_TIMESTAMP_UNIQUE} if a
     *            federation-wide unique timestamp will be generated by the
     *            server.
     * 
     * @return The result of an atomic read on the post-update state of the
     *         logical row. Only the most current bindings will be present for
     *         each property.
     */
    public Map<String, Object> write(final Schema schema,
            final Map<String, Object> propertySet, final long writeTime) {

        return write(schema, propertySet, writeTime, null/* filter */, null/* precondition */)
                .asMap();
        
    }

    /**
     * Atomic write with atomic read of the then current post-condition state of
     * the logical row.
     * <p>
     * Note: In order to cause a column value for row to be deleted you MUST
     * specify a <code>null</code> column value for that column. A
     * <code>null</code> will be written under the key for the column value
     * with a new timestamp. This is interpreted as a deleted property value
     * when the row is simplified as a {@link Map}. If you examine the
     * {@link ITPS} you can see the {@link ITPV} with the <code>null</code>
     * value and the timestamp of the delete.
     * <p>
     * Note: the value of the <i>primaryKey</i> is written each time the
     * logical row is updated and timestamp associate with the value for the
     * <i>primaryKey</i> property tells you the timestamp of each row revision.
     * <p>
     * Note: If the caller specified a <i>timestamp</i>, then that timestamp is
     * used by the atomic read. If the timestamp was assigned by the server,
     * then the server assigned timestamp is used by the atomic read.
     * <p>
     * Note: You can verify pre-conditions for the logical row on the server.
     * Among other things this could be used to reject an update if someone has
     * modified the logical row since you last read some value.
     * 
     * @param schema
     *            The {@link Schema} governing the logical row.
     * @param propertySet
     *            The column names and values for that row. The primaryKey as
     *            identified by the {@link Schema} MUST be present in the
     *            <i>propertySet</i>.
     * @param writeTime
     *            The timestamp to use for the row -or-
     *            {@link IRowStoreConstants#AUTO_TIMESTAMP} if the timestamp
     *            will be generated by the server -or-
     *            {@link IRowStoreConstants#AUTO_TIMESTAMP_UNIQUE} if a
     *            federation-wide unique timestamp will be generated by the
     *            server.
     * @param filter
     *            An optional filter used to select the property values that
     *            will be returned (this has no effect on the atomic write).
     * @param precondition
     *            When present, the pre-condition state of the row will be read
     *            and offered to the {@link IPrecondition}. If the
     *            {@link IPrecondition} fails, then the atomic write will NOT be
     *            performed and the pre-condition state of the row will be
     *            returned. If the {@link IPrecondition} succeeds, then the
     *            atomic write will be performed and the post-condition state of
     *            the row will be returned. Use {@link TPS#isPreconditionOk()}
     *            to determine whether or not the write was performed.
     * 
     * @return The result of an atomic read on the post-update state of the
     *         logical row -or- <code>null</code> iff there is no data for the
     *         <i>primaryKey</i> (per the contract for an atomic read).
     *         <p>
     *         If an optional {@link IPrecondition} was specified and the
     *         {@link IPrecondition} was NOT satisfied, then the write
     *         operation was NOT performed and the result is the pre-condition
     *         state of the logical row (which, again, will be <code>null</code>
     *         IFF there is NO data for the <i>primaryKey</i>).
     * 
     * @see ITPS#getWriteTimestamp()
     */
    public TPS write(final Schema schema,
            final Map<String, Object> propertySet, final long writeTime,
            final INameFilter filter, final IPrecondition precondition) {

        return write(schema, propertySet, MIN_TIMESTAMP, CURRENT_ROW,
                writeTime, filter, precondition);

    }
    
    /**
     * Atomic write with atomic read of the post-condition state of the logical
     * row.
     * <p>
     * Note: In order to cause a column value for row to be deleted you MUST
     * specify a <code>null</code> column value for that column. A
     * <code>null</code> will be written under the key for the column value
     * with a new timestamp. This is interpreted as a deleted property value
     * when the row is simplified as a {@link Map}. If you examine the
     * {@link ITPS} you can see the {@link ITPV} with the <code>null</code>
     * value and the timestamp of the delete.
     * <p>
     * Note: the value of the <i>primaryKey</i> is written each time the
     * logical row is updated and timestamp associate with the value for the
     * <i>primaryKey</i> property tells you the timestamp of each row revision.
     * <p>
     * Note: If the caller specified a <i>timestamp</i>, then that timestamp is
     * used by the atomic read. If the timestamp was assigned by the server,
     * then the server assigned timestamp is used by the atomic read.
     * <p>
     * Note: You can verify pre-conditions for the logical row on the server.
     * Among other things this could be used to reject an update if someone has
     * modified the logical row since you last read some value.
     * 
     * @param schema
     *            The {@link Schema} governing the logical row.
     * @param propertySet
     *            The column names and values for that row. The primaryKey as
     *            identified by the {@link Schema} MUST be present in the
     *            <i>propertySet</i>.
     * @param fromTime
     *            <em>During pre-condition and post-condition reads</em>, the
     *            first timestamp for which timestamped property values will be
     *            accepted.
     * @param toTime
     *            <em>During pre-condition and post-condition reads</em>, the
     *            first timestamp for which timestamped property values will NOT
     *            be accepted -or- {@link IRowStoreConstants#CURRENT_ROW} to
     *            accept only the most current binding whose timestamp is GTE
     *            <i>fromTime</i>.
     * @param writeTime
     *            The timestamp to use for the row -or-
     *            {@link IRowStoreConstants#AUTO_TIMESTAMP} if the timestamp
     *            will be generated by the server -or-
     *            {@link IRowStoreConstants#AUTO_TIMESTAMP_UNIQUE} if a
     *            federation-wide unique timestamp will be generated by the
     *            server.
     * @param filter
     *            An optional filter used to select the property values that
     *            will be returned (this has no effect on the atomic write).
     * @param precondition
     *            When present, the pre-condition state of the row will be read
     *            and offered to the {@link IPrecondition}. If the
     *            {@link IPrecondition} fails, then the atomic write will NOT be
     *            performed and the pre-condition state of the row will be
     *            returned. If the {@link IPrecondition} succeeds, then the
     *            atomic write will be performed and the post-condition state of
     *            the row will be returned. Use {@link TPS#isPreconditionOk()}
     *            to determine whether or not the write was performed.
     * 
     * @return The result of an atomic read on the post-update state of the
     *         logical row, which will be <code>null</code> IFF there is NO
     *         data for the <i>primaryKey</i>.
     *         <p>
     *         If an optional {@link IPrecondition} was specified and the
     *         {@link IPrecondition} was NOT satisified, then the write
     *         operation was NOT performed and the result is the pre-condition
     *         state of the logical row (which, again, will be <code>null</code>
     *         IFF there is NO data for the <i>primaryKey</i>).
     * 
     * @throws UnsupportedOperationException
     *             if a property has an auto-increment type and the
     *             {@link ValueType} of the property does not support
     *             auto-increment.
     * @throws UnsupportedOperationException
     *             if a property has an auto-increment type but there is no
     *             successor in the value space of that property.
     * 
     * @see ITPS#getWriteTimestamp()
     * 
     * @todo the atomic read back may be overkill. When you need the data is
     *       means that you only do one RPC rather than two. When you do not
     *       need the data it is just more network traffic and more complexity
     *       in this method signature. You can get pretty much the same result
     *       by doing an atomic read after the fact using the timestamp assigned
     *       by the server to the row (pretty much in the sense that it is
     *       possible for another write to explicitly specify the same timestamp
     *       and hence overwrite your data).
     * 
     * @todo the timestamp could be an {@link ITimestampService} with an
     *       implementation that always returns a caller-given constant, another
     *       that uses the local system clock, another that uses the system
     *       clock but ensures that it never hands off the same timestamp twice
     *       in a row, and another than resolves the global timestamp service.
     *       <p>
     *       it is also possible that the timestamp behavior should be defined
     *       by the {@link Schema} and therefore factored out of this method
     *       signature.
     */
    public TPS write(final Schema schema,
            final Map<String, Object> propertySet, final long fromTime,
            final long toTime, final long writeTime, final INameFilter filter,
            final IPrecondition precondition) {
        
        // check before extracting the primary key.
        if (schema == null)
            throw new IllegalArgumentException();

        // check before extracting the primary key.
        if (propertySet == null)
            throw new IllegalArgumentException();

        // extract the primary key.
        final Object primaryKey = propertySet.get(schema.getPrimaryKeyName());

        // verify args.
        assertArgs(schema, primaryKey, fromTime, toTime);
        
        if (log.isInfoEnabled())
            log.info("schema=" + schema.getName() + ", primaryKey="
                    + primaryKey + ", timestamp=" + writeTime + ", filter="
                    + (filter == null ? "N/A" : filter.getClass().getName())+
                    ", precondition="
                    + (precondition == null ? "N/A" : precondition.getClass()
                            .getName()));

        final AtomicRowWriteRead proc = new AtomicRowWriteRead(schema,
                propertySet, fromTime, toTime, writeTime, filter, precondition);

        final byte[] key = schema.fromKey(
                ndx.getIndexMetadata().getKeyBuilder(), primaryKey).getKey();

        return (TPS) ndx.submit(key, proc);

    }

    /**
     * Atomic delete of all property values for the current logical row.
     * 
     * @param schema
     *            The schema.
     * @param primaryKey
     *            The primary key for the logical row.
     * 
     * @return The deleted property values.
     */
    public ITPS delete(Schema schema, Object primaryKey) {

        return delete(schema, primaryKey, MIN_TIMESTAMP, CURRENT_ROW,
                AUTO_TIMESTAMP_UNIQUE, null/* filter */);
        
    }
    
    /**
     * Atomic delete of all property values for the logical row. The property
     * values are read atomically, each property value that is read is then
     * overwritten with a <code>null</code>, and the read property values are
     * returned.
     * 
     * @param schema
     *            The schema.
     * @param primaryKey
     *            The primary key for the logical row.
     * @param fromTime
     *            <em>During pre-condition and post-condition reads</em>, the
     *            first timestamp for which timestamped property values will be
     *            accepted.
     * @param toTime
     *            <em>During pre-condition and post-condition reads</em>, the
     *            first timestamp for which timestamped property values will NOT
     *            be accepted -or- {@link IRowStoreConstants#CURRENT_ROW} to
     *            accept only the most current binding whose timestamp is GTE
     *            <i>fromTime</i>.
     * @param writeTime
     *            The timestamp that will be written into the "deleted" entries
     *            -or- {@link IRowStoreConstants#AUTO_TIMESTAMP} if the
     *            timestamp will be generated by the server -or-
     *            {@link IRowStoreConstants#AUTO_TIMESTAMP_UNIQUE} if a
     *            federation-wide unique timestamp will be generated by the
     *            server.
     * @param filter
     *            An optional filter used to select the property values that
     *            will be deleted.
     * 
     * @return The property values that were read from the store before they
     *         were deleted. The {@link ITPS#getWriteTimestamp()} will report
     *         the timestamp assigned to the deleted entries used to overwrite
     *         these property values in the store.
     * 
     * @todo add optional {@link IPrecondition}.
     * 
     * @todo unit tests.
     */
    public ITPS delete(final Schema schema, Object primaryKey,
            final long fromTime, final long toTime, final long writeTime,
            final INameFilter filter) {

        assertArgs(schema, primaryKey, fromTime, toTime);
        
        if (log.isInfoEnabled())
            log.info("schema=" + schema + ", primaryKey=" + primaryKey
                    + ", timestamp=" + writeTime + ", filter="
                    + (filter == null ? "N/A" : filter.getClass().getName()));
            
        final AtomicRowDelete proc = new AtomicRowDelete(schema, primaryKey,
                fromTime, toTime, writeTime, filter);
        
        final byte[] key = schema.fromKey(
                ndx.getIndexMetadata().getKeyBuilder(), primaryKey).getKey();

        return (TPS) ndx.submit(key, proc);

    }
    
    /**
     * A logical row scan. Each logical row will be read atomically. Only the
     * current bindings for property values will be returned.
     * 
     * @param schema
     *            The {@link Schema} governing the logical row.
     * 
     * @return An iterator visiting each logical row in the specified key range.
     */
    public Iterator<? extends ITPS> rangeIterator(final Schema schema) {

        return rangeIterator(schema, null/* fromKey */, null/* toKey */,
                0/* capacity */, MIN_TIMESTAMP, CURRENT_ROW, null/* filter */);

    }

    /**
     * A logical row scan. Each logical row will be read atomically. Only the
     * current bindings for property values will be returned.
     * 
     * @param schema
     *            The {@link Schema} governing the logical row.
     * @param fromKey
     *            The value of the primary key for lower bound (inclusive) of
     *            the key range -or- <code>null</code> iff there is no lower
     *            bound.
     * @param toKey
     *            The value of the primary key for upper bound (exclusive) of
     *            the key range -or- <code>null</code> iff there is no lower
     *            bound.
     * 
     * @return An iterator visiting each logical row in the specified key range.
     */
    public Iterator<? extends ITPS> rangeIterator(final Schema schema,
            final Object fromKey, final Object toKey) {

        return rangeIterator(schema, fromKey, toKey, 0/* capacity */,
                MIN_TIMESTAMP, CURRENT_ROW, null/* filter */);

    }

    /**
     * A logical row scan. Each logical row will be read atomically. Only the
     * current bindings for property values will be returned.
     * 
     * @param schema
     *            The {@link Schema} governing the logical row.
     * @param fromKey
     *            The value of the primary key for lower bound (inclusive) of
     *            the key range -or- <code>null</code> iff there is no lower
     *            bound.
     * @param toKey
     *            The value of the primary key for upper bound (exclusive) of
     *            the key range -or- <code>null</code> iff there is no lower
     *            bound.
     * @param filter
     *            An optional filter.
     * 
     * @return An iterator visiting each logical row in the specified key range.
     */
    public Iterator<? extends ITPS> rangeIterator(final Schema schema,
            final Object fromKey, final Object toKey, final INameFilter filter) {

        return rangeIterator(schema, fromKey, toKey, 0/* capacity */,
                MIN_TIMESTAMP, CURRENT_ROW, filter);

    }

    /**
     * A logical row scan. Each logical row will be read atomically.
     * 
     * @param schema
     *            The {@link Schema} governing the logical row.
     * @param fromKey
     *            The value of the primary key for lower bound (inclusive) of
     *            the key range -or- <code>null</code> iff there is no lower
     *            bound.
     * @param toKey
     *            The value of the primary key for upper bound (exclusive) of
     *            the key range -or- <code>null</code> iff there is no lower
     *            bound.
     * @param capacity
     *            When non-zero, this is the maximum #of logical rows that will
     *            be read atomically. This is only an upper bound. The actual
     *            #of logical rows in an atomic read depends on a variety of
     *            factors.
     * @param fromTime
     *            The first timestamp for which timestamped property values will
     *            be accepted.
     * @param toTime
     *            The first timestamp for which timestamped property values will
     *            NOT be accepted -or- {@link IRowStoreConstants#CURRENT_ROW} to
     *            accept only the most current binding whose timestamp is GTE
     *            <i>fromTime</i>.
     * @param nameFilter
     *            An optional filter used to select the property(s) of interest.
     * 
     * @return An iterator visiting each logical row in the specified key range.
     */
    @SuppressWarnings("unchecked")
    public Iterator<? extends ITPS> rangeIterator(final Schema schema,
            Object fromKey, Object toKey, final int capacity,
            final long fromTime, final long toTime, final INameFilter nameFilter) {

        assertArgs(schema, Boolean.TRUE/* fake */, fromTime, toTime);
        
        if (log.isInfoEnabled())
            log.info("schema="
                    + schema
                    + ", fromKey="
                    + fromKey
                    + ", toKey="
                    + toKey
                    + ", capacity="
                    + capacity
                    + ", fromTime="
                    + fromTime
                    + ", toTime="
                    + toTime
                    + ", filter="
                    + (nameFilter == null ? "N/A" : nameFilter.getClass()
                            .getName()));
        
        final IKeyBuilder keyBuilder = ndx.getIndexMetadata().getKeyBuilder();

        if (fromKey != null) {

            // convert to an unsigned byte[].
            fromKey = schema.fromKey(keyBuilder, fromKey).getKey();
            
        }

        if (toKey != null) {

            // convert to an unsigned byte[].
            toKey = schema.fromKey(keyBuilder, toKey).getKey();

        }

        /*
         * If the primary key type has a fixed length (int, long, etc), then the
         * successor for continuation queries must be formed by adding one to
         * the last key visited. Otherwise an unsigned nul byte is appended
         * (ASCII, Unicode).
         */
        final boolean fixedLengthSuccessor = schema.getPrimaryKeyType().isFixedLength();
        
        final int flags = IRangeQuery.DEFAULT
                | IRangeQuery.READONLY
                | (fixedLengthSuccessor ? IRangeQuery.FIXED_LENGTH_SUCCESSOR
                        : 0);
        
        /*
         * Setup an iterator that visits the timestamp-property-value tuples and
         * a filter that aggregates logical rows into chunks.
         */
        
        return new Striterator(ndx.rangeIterator(//
                (byte[]) fromKey, //
                (byte[]) toKey, //
                capacity, // max #of rows to fetch at a time.
                flags, // 
                new AtomicRowFilter(schema, fromTime, toTime, nameFilter)))
                .addFilter(new Resolver() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    protected Object resolve(Object obj) {
                        // resolve visited TPS from tuple.
                        final ITuple<TPS> tuple = (ITuple<TPS>) obj;
                        if (log.isInfoEnabled()) {
                            log.info("resolving TPS: " + tuple.getVisitCount());
                        }
                        return tuple.getObject();
                    }
                });
        
    }

    /**
     * Options for the {@link SparseRowStore}.
     * 
     * @author <a href="mailto:thompsonbry@users.sourceforge.net">Bryan
     *         Thompson</a>
     */
    public interface Options {

        /**
         * The schema name was originally written using a Unicode sort key.
         * However, the JDK can generate Unicode sort keys with embedded nuls
         * which in turn will break the logic to detect the end of the schema
         * name in the key. In order to accommodate this behavior, the schema
         * name is now encoded as UTF8 which also has the advantage that we can
         * decode the schema name. Standard prefix compression on the B+Tree
         * should make up for the larger representation of the schema name in
         * the B+Tree.
         * <p>
         * This change was introduced on 7/29/2010 in the trunk. When this
         * property is <code>true</code> it breaks compatibility with earlier
         * revisions of the {@link SparseRowStore}. This flag may be set to
         * <code>false</code> for backward compatibility.
         * 
         * @see #DEFAULT_SCHEMA_NAME_UNICODE_CLEAN
         */
        String SCHEMA_NAME_UNICODE_CLEAN = Schema.class.getName()
                + ".schemaName.unicodeClean";

        /**
         * @see https://sourceforge.net/apps/trac/bigdata/ticket/107
         */
        String DEFAULT_SCHEMA_NAME_UNICODE_CLEAN = "true";
        
        /**
         * The primary key was originally written using a Unicode sort key.
         * However, the JDK generates Unicode sort keys with embedded nuls and
         * that broke the logic to detect the end of the Unicode primary keys.
         * In order to accommodate this behavior, the Unicode primary key is now
         * encoded as UTF8 which also has the advantage that we can decode
         * Unicode primary keys. Standard prefix compression on the B+Tree
         * should make up for the larger representation of the Unicode primary
         * key in the B+Tree.
         * <p>
         * This change was introduced on 7/15/2010 in the trunk and breaks
         * compatibility with earlier revisions of the {@link SparseRowStore}.
         * This flag may be set to <code>false</code> for backward
         * compatibility.
         * 
         * @see Options#DEFAULT_PRIMARY_KEY_UNICODE_CLEAN
         */
        String PRIMARY_KEY_UNICODE_CLEAN = Schema.class.getName()
                + ".primaryKey.unicodeClean";

        /**
         * @see https://sourceforge.net/apps/trac/bigdata/ticket/107
         */
        String DEFAULT_PRIMARY_KEY_UNICODE_CLEAN = "true";

    }

    /**
     * This is a global option since it was always <code>false</code> for
     * historical stores.
     * 
     * @see Options#SCHEMA_NAME_UNICODE_CLEAN
     */
    final static transient boolean schemaNameUnicodeClean = Boolean
            .valueOf(System.getProperty(
                    SparseRowStore.Options.SCHEMA_NAME_UNICODE_CLEAN,
                    SparseRowStore.Options.DEFAULT_SCHEMA_NAME_UNICODE_CLEAN));

    /**
     * This is a global option since it was always <code>false</code> for
     * historical stores.
     * 
     * @see Options#PRIMARY_KEY_UNICODE_CLEAN
     */
    final static transient boolean primaryKeyUnicodeClean = Boolean
            .valueOf(System.getProperty(
                    SparseRowStore.Options.PRIMARY_KEY_UNICODE_CLEAN,
                    SparseRowStore.Options.DEFAULT_PRIMARY_KEY_UNICODE_CLEAN));

    /**
     * List of namespaces, defined in the row store.
     * @param tx The transaction identifier -or- <code>timestamp</code> if the
	 *         {@link IIndexManager} is not a {@link Journal}.
     * @return List of namespaces
     */
	public List<String> getNamespaces(long tx) {
		// the triple store namespaces.
		final List<String> namespaces = new LinkedList<String>();

		// scan the relation schema in the global row store.
		@SuppressWarnings("unchecked")
		final Iterator<ITPS> itr = (Iterator<ITPS>)
				rangeIterator(RelationSchema.INSTANCE);

		while (itr.hasNext()) {

			// A timestamped property value set is a logical row with
			// timestamped property values.
			final ITPS tps = itr.next();

			// If you want to see what is in the TPS, uncomment this.
			// System.err.println(tps.toString());

			// The namespace is the primary key of the logical row for the
			// relation schema.
			final String namespace = (String) tps.getPrimaryKey();

			// Get the name of the implementation class
			// (AbstractTripleStore, SPORelation, LexiconRelation, etc.)
			final String className = (String) tps.get(RelationSchema.CLASS)
					.getValue();

			if (className == null) {
				// Skip deleted triple store entry.
				continue;
			}

			try {
				final Class<?> cls = Class.forName(className);
				if (AbstractTripleStore.class.isAssignableFrom(cls)) {
					// this is a triple store (vs something else).
					namespaces.add(namespace);
				}
			} catch (ClassNotFoundException e) {
				log.error(e, e);
			}

		}

		return namespaces;
	}

}
